package com.example.event.rocket;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationEventPublisher;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@EnableConfigurationProperties(EventProperties.class)
public class RocketMqConsumer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    /** Class 缓存*/
    private ConcurrentMap<String, Class<?>> name2Class = new ConcurrentHashMap<>();

    @Resource
    private EventProperties eventProperties;

    private DefaultMQPushConsumer consumer;

    @Resource
    private ApplicationEventPublisher applicationEventPublisher;

    @Value(("${spring.application.name}"))
    private String appName;

    @PostConstruct
    private void init() {
        try {
            initConsumer();
        } catch (Exception e) {
            logger.error("初始化EventBus：RocketMq发生未知错误", e);
        }
    }

    void initConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(appName);
        consumer.setNamesrvAddr(eventProperties.getNameServer());
        //集群模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
//        consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        consumer.subscribe(eventProperties.getTopic(), eventProperties.getTag());
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                try {
                    String body = new String(msg.getBody(), StandardCharsets.UTF_8);
                    RemoteEvent remoteEvent = JSON.parseObject(body, RemoteEvent.class);
                    Class<?> clazz = getClazz(remoteEvent.getClazz());
                    Object event = JSON.parseObject(remoteEvent.getBody(), clazz);
                    //发布事件
                    applicationEventPublisher.publishEvent(event);
                } catch (Exception e) {
                    logger.error("处理消息发生错误[{}]", new String(msg.getBody(), StandardCharsets.UTF_8), e);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }


    private Class<?> getClazz(String name) throws ClassNotFoundException {
        if (name2Class.containsKey(name)) {
            return name2Class.get(name);
        }
        try {
            Class<?> clazz = Class.forName(name);
            Class<?> prev = name2Class.putIfAbsent(name, clazz);
            return prev != null ? prev : clazz;
        } catch (ClassNotFoundException e) {
            throw e;
        }
    }

    @PreDestroy
    public void stop() {
        consumer.shutdown();
    }
}